In the world of algorithmic trading, this focused price action algorithm operates in the 15-minute timeframe. With a solid 70% success rate, it handles candlestick patterns, support-resistance dynamics, and keen pattern recognition for high performance. This report will delve into its workings and the backtesting procedure.
We begin by importing the necessary packages for data processing and visualizing the results and plots.
import pandas as pd
import pandas_ta as ta
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime
csv_path = "Downloads/Data_2020-2022_wExpiry.csv"
columns = ['Date', 'ExpiryDate', 'Time', 'Open', 'High', 'Low', 'Close']
df_1 = pd.read_csv(csv_path, header=None, names=columns)
df_1 = df_1.dropna()
df_1.tail()
| Date | ExpiryDate | Time | Open | High | Low | Close | |
|---|---|---|---|---|---|---|---|
| 277870 | 20221230 | 20230125 | 1526 | 23110.54002 | 23144.45146 | 23108.56403 | 23135.66988 |
| 277871 | 20221230 | 20230125 | 1527 | 23133.20795 | 23134.09097 | 23110.70025 | 23111.47473 |
| 277872 | 20221230 | 20230125 | 1528 | 23111.82193 | 23121.38728 | 23104.82660 | 23121.38728 |
| 277873 | 20221230 | 20230125 | 1529 | 23118.95512 | 23134.30505 | 23111.82193 | 23122.91099 |
| 277874 | 20221230 | 20230125 | 1530 | 23122.45653 | 23132.08420 | 23120.63887 | 23130.66629 |
# Preserve the original dataset
dfc = df_1.copy()
# Convert 'Date' column to a datetime format
dfc['Date'] = pd.to_datetime(dfc['Date'], format='%Y%m%d')
# Extract hour and minute information from the 'Time' column
dfc['Hour'] = dfc['Time'] // 100
dfc['Minute'] = dfc['Time'] % 100
# Create a new datetime column combining date, hour, and minute
dfc['DateTime'] = pd.to_datetime(dfc[['Date', 'Hour', 'Minute']].astype(str).agg('-'.join, axis=1), format='%Y-%m-%d-%H-%M')
# Set 'DateTime' column as the index
dfc.set_index('DateTime', inplace=True)
def df_tf(t):
# Resample the data to desired intervals and use the open price as the resampling method
df_tf = dfc.resample(f'{t}',closed = 'right').agg({'Open': 'first', 'High': 'max', 'Low': 'min', 'Close': 'last'})
df_tf.dropna(subset=['Close'], inplace=True)
# Reset the index to make 'DateTime' a regular column again
df_tf.reset_index(inplace=True)
return df_tf
df = df_tf('15T')
df['DateTime'] = df['DateTime'] + pd.Timedelta(minutes=15)
The points of valleys and peaks are marked as support and resistance. The functions return 1 if a level is detected to be a support of resistance else they return 0. This is combined with the candlestick patterns to enter a trade.
def support(df1, l, n1, n2):
for i in range(l-n1+1, l+1):
if(df1.Low[i]>df1.Low[i-1]):
return 0
for i in range(l+1,l+n2+1):
if(df1.Low[i]<df1.Low[i-1]):
return 0
return 1
def resistance(df1, l, n1, n2):
for i in range(l-n1+1, l+1):
if(df1.High[i]<df1.High[i-1]):
return 0
for i in range(l+1,l+n2+1):
if(df1.High[i]>df1.High[i-1]):
return 0
return 1
The list of parameters used for identification is given below.
length = len(df)
high = list(df['High'])
low = list(df['Low'])
close = list(df['Close'])
open = list(df['Open'])
bodydiff = [0] * length
highWick = [0] * length
lowWick = [0] * length
ratio1 = [0] * length
ratio2 = [0] * length
The engulfing candlestick pattern is a significant and easily recognizable candlestick formation that provides insights into potential trend reversals. There are two types of engulfing patterns: bullish engulfing and bearish engulfing.
def isEngulfing(row):
# Bearish Engulfing Pattern
if (open[row-1]<close[row-1] and
open[row]>close[row] and
open[row]>=close[row-1] and close[row]<open[row-1]):
return 1
# Bullish Engulfing Pattern
elif(open[row-1]>close[row-1] and
open[row]<close[row] and
open[row]<=close[row-1]and close[row]>open[row-1]):
return 2
else:
return 0
A Doji is a candlestick pattern characterized by its small or nonexistent body, indicating that the opening and closing prices are virtually equal. The unique feature of a Doji is the presence of long wicks or shadows, with the length of the wicks often exceeding the size of the candle's body. When the wick on one side of the Doji is notably shorter than the other, it can signal potential bullish or bearish reversals. Here are the two types:
def isDoji(row):
highWick[row] = high[row]-max(open[row],close[row])
lowWick[row] = min(open[row],close[row])-low[row]
bodydiff[row] = abs(open[row]-close[row])
if highWick[row]>bodydiff[row] and lowWick[row]<highWick[row]:
return 1
elif lowWick[row]>bodydiff[row] and highWick[row]<lowWick[row]:
return 2
else:
return 0
The functions below determine whether a given OHLC price, at a specific row,l, of the dataframe, is in proximity to any support or resistance (S/R)levels within the provided list of levels. The parameter lim defines the proximity zone around the S/R levels where the price is considered to be at that level.
def closeResistance(l,levels,lim):
if len(levels)==0:
return 0
# Condition 1: Check if the high price is within the defined proximity zone to the nearest resistance level
c1 = abs(df.High[l] - min(levels, key=lambda x: abs(x - df.High[l]))) <= lim
# Condition 2: Check if the higher of open/close prices is within the defined proximity zone to the nearest resistance level
c2 = abs(max(df.Open[l], df.Close[l]) - min(levels, key=lambda x: abs(x - df.High[l]))) <= lim
# Condition 3: Check if the lower of open/close prices is below the nearest resistance level
c3 = min(df.Open[l], df.Close[l]) < min(levels, key=lambda x: abs(x - df.High[l]))
# Condition 4: Check if the low price is below the nearest resistance level
c4 = df.Low[l] < min(levels, key=lambda x: abs(x - df.High[l]))
if( (c1 or c2) and c3 and c4 ):
return 1
else:
return 0
def closeSupport(l,levels,lim):
if len(levels)==0:
return 0
# Condition 1: Check if the low price is within the defined proximity zone to the nearest support level
c1 = abs(df.Low[l] - min(levels, key=lambda x: abs(x - df.Low[l]))) <= lim
# Condition 2: Check if the lower of open/close prices is within the defined proximity zone to the nearest support level
c2 = abs(min(df.Open[l], df.Close[l]) - min(levels, key=lambda x: abs(x - df.Low[l]))) <= lim
# Condition 3: Check if the higher of open/close prices is above the nearest support level
c3 = max(df.Open[l], df.Close[l]) > min(levels, key=lambda x: abs(x - df.Low[l]))
# Condition 4: Check if the high price is above the nearest support level
c4 = df.High[l] > min(levels, key=lambda x: abs(x - df.Low[l]))
if( (c1 or c2) and c3 and c4 ):
return 1
else:
return 0
1: Bearish signal at resistance with Bearish Doji or Engulfing.2: Bullish signal at support with Bullish Doji or Engulfing.0: No trading pattern detected.n1=2
n2=2
backCandles=50
signal = [0] * length
for row in range(backCandles, len(df)-n2):
ss = []
rr = []
for subrow in range(row-backCandles+n1, row+1):
if support(df, subrow, n1, n2):
ss.append(df.Low[subrow])
if resistance(df, subrow, n1, n2):
rr.append(df.High[subrow])
if ((isEngulfing(row)==1 or isDoji(row)==1) and closeResistance(row, rr,3)):
signal[row] = 1
elif((isEngulfing(row)==2 or isDoji(row)==2) and closeSupport(row, ss, 3)):
signal[row] = 2
else:
signal[row] = 0
df['signal']=signal
class Backtester:
def __init__(self):
# Initialize strategy parameters and variables
self.long_position = {'active': False, 'entry_price': 0.0, 'exit_price': 0.0, 'trade_pnl': 0.0}
self.short_position = {'active': False, 'entry_price': 0.0, 'exit_price': 0.0, 'trade_pnl': 0.0}
self.current_pnl = 0.0 # Track current pnl for the active position
self.pnl = 0.0
self.total_pnl = []
self.trades = []
def execute_trade(self, trade_type, entry_price, entry_time):
# Execute a trade and update relevant variables
trade = {'type': trade_type, 'entry_price': entry_price, 'entry_time': entry_time,
'exit_price': None, 'exit_time': None, 'trade_pnl': 0.0}
self.trades.append(trade)
if trade_type == 'long':
self.long_position['active'] = True
self.long_position['entry_price'] = entry_price
elif trade_type == 'short':
self.short_position['active'] = True
self.short_position['entry_price'] = entry_price
else:
trade_pnl = 0
self.total_pnl.append(self.pnl)
def close_position(self, exit_price,exit_time):
# Close the active position and update relevant variables
if self.long_position['active']:
self.long_position['active'] = False
self.long_position['exit_price'] = exit_price
self.long_position['trade_pnl'] = 100 * (exit_price - self.long_position['entry_price']) - 750
self.pnl += self.long_position['trade_pnl']
self.current_pnl = 0.0 # Reset current pnl after exiting
# Update exit price and pnl in the trades list
self.trades[-1]['exit_price'] = exit_price
self.trades[-1]['exit_time'] = exit_time
self.trades[-1]['trade_pnl'] = self.long_position['trade_pnl']
elif self.short_position['active']:
self.short_position['active'] = False
self.short_position['exit_price'] = exit_price
self.short_position['trade_pnl'] = -100 * (exit_price - self.short_position['entry_price']) - 750
self.pnl += self.short_position['trade_pnl']
self.current_pnl = 0.0 # Reset current pnl after exiting
# Update exit price and pnl in the trades list
self.trades[-1]['exit_price'] = exit_price
self.trades[-1]['exit_time'] = exit_time
self.trades[-1]['trade_pnl'] = self.short_position['trade_pnl']
self.total_pnl.append(self.pnl)
def update_current_pnl(self, current_price):
# Update the current pnl for the active position
if self.long_position['active']:
self.current_pnl = 100 * (current_price - self.long_position['entry_price']) - 750
elif self.short_position['active']:
self.current_pnl = -100 * (current_price - self.short_position['entry_price']) - 750
else:
self.current_pnl = 0
def get_summary(self):
# Return a summary of the strategy's performance
summary = {
'total_pnl': self.total_pnl,
'trades': self.trades,
'pnl': self.pnl,
'current_pnl': self.current_pnl
}
return summary
# Create a Backtester instance
backtester = Backtester()
capital = []
# Parameters
RRR = 1
df['Time'] = df['DateTime'].dt.time
df['Date'] = str(df['DateTime'].dt.date)
time = pd.to_datetime('15:30').time()
for i, data in df.iterrows():
backtester.update_current_pnl(data['Close'])
# Check if there is an active trade
# Long Trades
if backtester.long_position['active'] and (data['Close'] >= TP_l or data['Close'] <= SL_l):
# Close the position if SL or TP is hit
backtester.close_position(data['Close'],data['DateTime'])
# Short Trades
elif backtester.short_position['active'] and (data['Close'] <= TP_s or data['Close'] >= SL_s):
# Close the position if SL or TP is hit
backtester.close_position(data['Close'],data['DateTime'])
# Check for new signals
signal = data['signal']
# Bearish Trades
if signal == 1 and not (backtester.short_position['active'] or backtester.long_position['active']) and not data['Time'] == time:
SL_s = max(df['High'].iloc[i - 1:i + 1])
TP_s = data['Close'] - RRR * (SL_s-data['Close'])
backtester.execute_trade('short', data['Close'],data['DateTime'])
# Bullish Trades
elif signal == 2 and not (backtester.short_position['active'] or backtester.long_position['active']) and not data['Time'] == time:
SL_l = min(df['Low'].iloc[i - 1:i + 1])
TP_l = data['Close'] + RRR * (data['Close']-SL_l)
backtester.execute_trade('long', data['Close'],data['DateTime'])
# Close positions at the end of each day
if data['Time'] == time:
backtester.close_position(data['Close'],data['DateTime'])
capital.append(backtester.pnl + backtester.current_pnl)
summary = backtester.get_summary()
import plotly.graph_objects as go
# Create a Plotly figure
fig = go.Figure()
# Add a line trace for the different performances
fig.add_trace(go.Scatter(x=df['DateTime'], y=capital, mode='lines', name='Algo Performance',line=dict(color='blue')))
fig.add_trace(go.Scatter(x=df['DateTime'], y=(df['Close'][:]-df['Close'][0])*100, mode='lines', name='Buy and Hold',line=dict(color='green')))
# Customize the layout
fig.update_layout(
title='Algo vs. Market: PnL Showdown',
xaxis_title='Week #',
yaxis_title='Cumulative PnL',
template='seaborn',
)
# Display plot
fig.show()
# PnL occurred in each trade
Trade_PnL = [round(trade.get('trade_pnl'),2) for trade in summary['trades']]
# The total PnL after each trade (starts from 0)
Total_PnL = summary['total_pnl']
# PnL for each trade: long and short positions
long_pnl = [round(trade.get('trade_pnl'),2) for trade in summary['trades'] if trade.get('type')=='long']
short_pnl = [round(trade.get('trade_pnl'),2) for trade in summary['trades'] if trade.get('type')=='short']
trade_count = len(Trade_PnL)
winning_count = sum([1 for trade in Trade_PnL if trade>0])
losing_count = sum([1 for trade in Trade_PnL if trade<0])
win_rate = round(100*winning_count/trade_count,2)
print(f"Total Trades: {trade_count}\nWinning Trades: {winning_count}\nLosing Trades: {losing_count}\nTotal Win Rate: {win_rate} %")
Total Trades: 1216 Winning Trades: 858 Losing Trades: 358 Total Win Rate: 70.56 %
long_count = len(long_pnl)
short_count = len(short_pnl)
print(f"Long Trades: {long_count}\nShort Trades:{short_count}")
Long Trades: 611 Short Trades:605
long_winners = sum([1 for trade in long_pnl if trade>0])
long_losers = sum([1 for trade in long_pnl if trade<0])
long_winrate = round(100*long_winners/(long_winners+long_losers),2)
short_winners = sum([1 for trade in short_pnl if trade>=0])
short_losers= sum([1 for trade in short_pnl if trade<0])
short_winrate = round(100*short_winners/(short_winners+short_losers),2)
print(f"Number of Winning Long Trades: {long_winners}\nNumber of Losing Long Trades: {long_losers}\nWinning Rate of Long Trades: {long_winrate} %\n\nNumber of Winning Short Trades: {short_winners}\nNumber of Losing Short Trades: {short_losers}\nWinning Rate of Short Trades: {short_winrate} %")
Number of Winning Long Trades: 417 Number of Losing Long Trades: 194 Winning Rate of Long Trades: 68.25 % Number of Winning Short Trades: 441 Number of Losing Short Trades: 164 Winning Rate of Short Trades: 72.89 %
def max_streak(numbers):
# Initialize variables to track consecutive losses and gains
max_consecutive_losses = 0
max_consecutive_gains = 0
current_consecutive_losses = 0
current_consecutive_gains = 0
# Iterate through the list of numbers
for num in numbers:
# Check for consecutive losses
if num < 0:
current_consecutive_losses += 1
current_consecutive_gains = 0
max_consecutive_losses = max(max_consecutive_losses, current_consecutive_losses)
# Check for consecutive gains
elif num > 0:
current_consecutive_losses = 0
current_consecutive_gains += 1
max_consecutive_gains = max(max_consecutive_gains, current_consecutive_gains)
# Return the maximum consecutive losses and gains
return max_consecutive_losses, max_consecutive_gains
# Calculate the maximum drawdown phase and value
streak = max_streak(Trade_PnL)
print(f"Maximum Consecutive Wins: {streak[1]} Trades\nMaximum Consecutive Losses: {streak[0]} Trades")
Maximum Consecutive Wins: 17 Trades Maximum Consecutive Losses: 5 Trades
max_gain = max(Trade_PnL)
max_loss = abs(min(Trade_PnL))
print(f"Largest Gain: {max_gain}\nLargest Loss: {max_loss}")
Largest Gain: 283461.26 Largest Loss: 223151.79
def max_drawdown(numbers):
# Initialize variables to track consecutive drawdown phases
max_consecutive_drawdown = 0
current_consecutive_drawdown = 0
peak_value = 0
max_drawdown_value = 0
# Iterate through the list of numbers
for num in numbers:
# Check for drawdown phase and update the phase and value
if num < peak_value:
current_consecutive_drawdown += 1
max_consecutive_drawdown = max(max_consecutive_drawdown, current_consecutive_drawdown)
max_drawdown_value = max(max_drawdown_value, peak_value - num)
else:
peak_value = num
current_consecutive_drawdown = 0
# Return the maximum consecutive drawdown phase and its value
return max_consecutive_drawdown, max_drawdown_value
# Calculate the maximum drawdown phase and value
drawdown = max_drawdown(capital)
print(f"Maximum Drawdown Phase: {drawdown[0]//25} Trading Days\nMaximum Drawdown:{round(drawdown[1],2)}")
Maximum Drawdown Phase: 42 Trading Days Maximum Drawdown:380578.11
net_pnl = round(sum(Trade_PnL),2)
gross_profit = round(sum(num for num in Trade_PnL if num > 0),2)
gross_loss = round(abs(sum(num for num in Trade_PnL if num < 0)),2)
try:
profit_factor = round(gross_profit/gross_loss,2)
print(f"Net PnL: {net_pnl}\nGross Profit: {gross_profit}\nGross Loss: {gross_loss}\nProfit Factor: {profit_factor}")
except ZeroDivisionError:
profit_factor = 'Infinity'
print(f"Net PnL: {net_pnl}\nGross Profit: {gross_profit}\nGross Loss: {gross_loss}\nProfit Factor: {profit_factor}")
Net PnL: 7860159.2 Gross Profit: 12492879.75 Gross Loss: 4632720.55 Profit Factor: 2.7
long_net_pnl = round(sum(long_pnl),2)
long_winners_pnl = round(sum([trade for trade in long_pnl if trade>0]),2)
long_losers_pnl = round(sum([trade for trade in long_pnl if trade<0]),2)
short_net_pnl = round(sum(short_pnl),2)
short_winners_pnl = round(sum([trade for trade in short_pnl if trade>0]),2)
short_losers_pnl= round(sum([trade for trade in short_pnl if trade<0]),2)
print(f"Net PnL of Long Trades: {long_net_pnl}\nGross Profit of Long Trades: {long_winners_pnl}\nGross Loss of Long Trades: {abs(long_losers_pnl)}\n\nNet PnL of Short Trades: {short_net_pnl}\nGross Profit of Short Trades: {short_winners_pnl}\nGross Loss of Short Trades: {abs(short_losers_pnl)}")
Net PnL of Long Trades: 3265336.81 Gross Profit of Long Trades: 5775813.01 Gross Loss of Long Trades: 2510476.2 Net PnL of Short Trades: 4594822.39 Gross Profit of Short Trades: 6717066.74 Gross Loss of Short Trades: 2122244.35
pnl_per_trade = round(net_pnl/trade_count,2)
print(f"Average Profit per Trade: {pnl_per_trade}")
Average Profit per Trade: 6463.95
This price action based 15-minute trading strategy showcases strong performance, boasting positive net profit, minimal drawdown, impressive win rates, and a favorable profit factor. It outperforms the traditional Buy and Hold strategy by nearly 1 Cr, attesting to its robust and stable performance.
trades = summary['trades']
trade_df = pd.DataFrame(trades)
# Mapping 'type' to 'qty'
trade_df['qty'] = trade_df['type'].apply(lambda x: 100 if x == 'long' else -100)
# Formatting date and time columns
trade_df['entry_date'] = pd.to_datetime(trade_df['entry_time']).dt.strftime('%Y%m%d')
trade_df['exit_date'] = pd.to_datetime(trade_df['exit_time']).dt.strftime('%Y%m%d')
trade_df['entry_time'] = pd.to_datetime(trade_df['entry_time']).dt.strftime('%H%M')
trade_df['exit_time'] = pd.to_datetime(trade_df['exit_time']).dt.strftime('%H%M')
# Renaming columns to match the required format
trade_df = trade_df.rename(columns={'entry_date': 'entrydate', 'entry_time': 'entrytime', 'exit_date': 'exitdate', 'exit_time': 'exittime', 'entry_price': 'entryprice', 'exit_price': 'exitprice'})
# Keeping only the required columns
trade_df = trade_df[['qty', 'entrydate', 'entrytime', 'entryprice', 'exitdate', 'exittime', 'exitprice']]
# Save the DataFrame to a CSV file
trade_df.to_csv('SiddAdi_CandlestickSR_trades_15minTF.csv', index=False)
print("Trades saved to 'SiddAdi_CandlestickSR_trades_15minTF.csv'")
Trades saved to 'SiddAdi_CandlestickSR_trades_15minTF.csv'
import plotly.graph_objects as go
import numpy as np
equity_values = capital
drawdown_values = np.maximum.accumulate(equity_values) - equity_values
n = 50 # Get the curve for the nth trading day
# Create traces for equity and drawdown
trace_equity = go.Scatter(x=df[n*25:25*(n+1)].index, y=equity_values[n*25:25*(n+1)], name='Equity', line=dict(color='blue'))
trace_drawdown = go.Bar(x=df[n*25:25*(n+1)].index, y=-drawdown_values[n*25:25*(n+1)], name='Drawdown', marker=dict(color='red',opacity = 0.6), yaxis='y2')
# Create layout with secondary y-axis
layout = go.Layout(
title='Equity and Drawdown Chart',
yaxis=dict(title='Equity', color='blue'),
yaxis2=dict(title='Drawdown', overlaying='y', side='right', color='red'),
template = 'seaborn'
)
# Create figure
fig = go.Figure(data=[trace_equity, trace_drawdown], layout=layout)
# Show the figure
fig.show()